package org.springblade.thingsphere.network.components.mqtt.server;

import cn.hutool.core.net.NetUtil;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.MqttTopicSubscription;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springblade.core.log.exception.ServiceException;
import org.springblade.thingsphere.CompositeProtocolSupport;
import org.springblade.thingsphere.domain.Message;
import org.springblade.thingsphere.manage.pojo.entity.GatewayEntity;
import org.springblade.thingsphere.manage.service.IGatewayService;
import org.springblade.thingsphere.message.DeviceMessageCodec;
import org.springblade.thingsphere.network.components.NetworkProvider;
import org.springblade.thingsphere.network.components.NetworkType;
import org.springblade.thingsphere.network.components.bus.event.MqttServerMsgEvent;
import org.springblade.thingsphere.network.components.common.MqttServerConfig;
import org.springblade.thingsphere.network.protocol.impl.ProtocolAssetSupplier;
import org.springblade.thingsphere.type.DefaultTransport;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author lhb
 * @date 2024/9/26 下午2:16
 */
@Slf4j
@Component
public class VertxMqttServer implements NetworkProvider<MqttServerConfig, MqttServer> {

	private final static Map<String, List<MqttEndpoint>> topicSubscribers = new ConcurrentHashMap<>();
	//存储每个topic的订阅关系
	private final static Map<MqttEndpoint, List<String>> subscriptions = new ConcurrentHashMap<>();

	private final static Map<Long, MqttServer> MQTT_SERVER_MAP = new ConcurrentHashMap<>();

	@Resource
	private ApplicationEventPublisher applicationEventPublisher;
	@Resource
	private IGatewayService gatewayService;
	@Resource
	private ProtocolAssetSupplier protocolAssetSupplier;
	@Resource
	private Vertx vertx;

	@Override
	public NetworkType getType() {
		return NetworkType.MQTT_SERVER;
	}

	@Override
	public MqttServer createNetwork(MqttServerConfig config) {
		log.info("【 Load the Mqtt Server network component 】config:{}", config);
		if (!NetUtil.isUsableLocalPort(config.getPort())) {
			throw new ServiceException("port unavailable");
		}

		MqttServer item = MQTT_SERVER_MAP.remove(config.getId());
		if (null != item) {
			item.close();
		}

		MqttServerOptions options = new MqttServerOptions();
		options.setHost(config.getHost());
		options.setPort(config.getPort());
		MqttServer mqttServer = MqttServer.create(vertx, options);

		mqttServer.endpointHandler(endpoint -> {
			//使用lambda表达式实现了endpointHandler方法，传入参数endpoint；
			log.info("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
			if (null == endpoint.auth()) {
				endpoint.close();
				log.info("endpoint.auth() 为空");
				return;
			}

			log.info("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
			if (StringUtils.isNotBlank(config.getUserName()) && !config.getUserName().equals(endpoint.auth().getUsername())) {
				log.error("The account is incorrect");
				endpoint.close();
				return;
			}
			if (StringUtils.isNotBlank(config.getPassword()) && !config.getPassword().equals(endpoint.auth().getPassword())) {
				log.error("The password is incorrect");
				endpoint.close();
				return;
			}

			endpoint.accept();
			// 处理订阅
			subscribeHandle(endpoint);
			// 处理取消订阅
			unSubscribeHandle(endpoint);
			// 接收处理
			receiveHandle(endpoint, config);
			// 断开连接句柄
			disConnectHandle(endpoint);
		});
		// 开启监听
		mqttServer.listen(handler -> {
			if (handler.succeeded()) {
				log.info("MQTT服务器正在端口上侦听 " + handler.result().actualPort());
			} else {
				log.error("启动服务器时出错" + handler.cause().getMessage());
			}
		}).exceptionHandler(e -> {
			log.error("listen exceptionHandler", e);
		});
		MQTT_SERVER_MAP.put(config.getId(), mqttServer);
		return mqttServer;
	}

	@Override
	public void shutdown(Long id) {
		MqttServer mqttServer = MQTT_SERVER_MAP.remove(id);
		if (null != mqttServer) {
			mqttServer.close();
		}
	}

	/*
	 * 设备断开处理
	 * */
	private void disConnectHandle(MqttEndpoint endpoint) {
		endpoint.disconnectHandler(disconnectMessage -> {
			log.info("Received disconnect from client:{}, reason code: {}", endpoint.auth().getUsername(), disconnectMessage);

			List<String> topics = subscriptions.get(endpoint);
			for (String topic : topics) {
				topicSubscribers.get(topic).remove(endpoint);
				log.info("disConnectHandle [" + topic + "]");
			}
			subscriptions.remove(endpoint);
		});

	}

	/**
	 * 处理订阅消息
	 */
	private void subscribeHandle(MqttEndpoint endpoint) {

		endpoint.subscribeHandler(subscribe -> {
			Boolean IsValidTopic = false;

			//存储订阅消息中要订阅的topic的列表
			List<MqttTopicSubscription> topicSubscriptions = subscribe.topicSubscriptions();

			//存储订阅topic Qos级别的列表
			List<MqttQoS> reasonCodes = new ArrayList<>();

			//遍历列表
			for (MqttTopicSubscription s : topicSubscriptions) {
				//topic
				String topic = s.topicName();
				//Qos级别
				MqttQoS qos = s.qualityOfService();

				//判断topic是否合法
				if (!isValidTopic(topic)) {
					//不合法则向设备发送消息
					endpoint.publish(topic, Buffer.buffer("非法topic，topic不可包含空格"), qos, false, false);
					continue;
				} else {
					IsValidTopic = true;
				}

				log.info("Subscription for " + topic + " with QoS " + qos);
				reasonCodes.add(qos);
				//判断是否已有此topic，如果有则直接添加，没有则新建键值对
				if (!topicSubscribers.containsKey(topic)) {
					topicSubscribers.put(topic, new ArrayList<MqttEndpoint>());
				}
				topicSubscribers.get(topic).add(endpoint);
				//同上
				if (!subscriptions.containsKey(endpoint)) {
					subscriptions.put(endpoint, new ArrayList<String>());
				}
				subscriptions.get(endpoint).add(topic);
			}

			if (IsValidTopic) {
				endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes);
			}
		});

	}

	/*
	 * 处理退订
	 * */
	private void unSubscribeHandle(MqttEndpoint endpoint) {
		endpoint.unsubscribeHandler(unsubscribe -> {
			log.info("退订:{}", unsubscribe.toString());
			//遍历要退订的topic
			for (String unsubscribedTopic : unsubscribe.topics()) {

				topicSubscribers.get(unsubscribedTopic).remove(endpoint);

				//如果某topic的订阅列表为空，删除topic
				if (topicSubscribers.get(unsubscribedTopic).size() == 0) {
					topicSubscribers.remove(unsubscribedTopic);
				}

				subscriptions.get(endpoint).remove(unsubscribedTopic);

				//同上
				if (subscriptions.get(endpoint).size() == 0) {
					subscriptions.remove(endpoint);
				}

				log.debug("unsubscribed :" + endpoint.auth().getUsername() + "for" + unsubscribedTopic);
			}
			endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
		});
	}

	/**
	 * 接收处理
	 *
	 * @param endpoint
	 * @param config
	 */
	private void receiveHandle(MqttEndpoint endpoint, MqttServerConfig config) {
		endpoint.publishHandler(publish -> {

			try {
				String topic = publish.topicName();
				Buffer payload = publish.payload();

				log.info("Received message topic:{}  payload: {} with QoS: {}", topic, payload.toString(Charset.defaultCharset()), publish.qosLevel());

				//对topic的合法性进行判断
				if (!isValidTopic(topic)) {
					endpoint.publish(topic, Buffer.buffer("非法topic，topic不可包含空格"), MqttQoS.AT_MOST_ONCE, false, false);
					return;
				}

				if (publish.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
					endpoint.publishAcknowledge(publish.messageId());
				} else if (publish.qosLevel() == MqttQoS.EXACTLY_ONCE) {
					endpoint.publishReceived(publish.messageId());
				}

				// 上报属性topic
				if (topic.equals(config.getReportPropertyPath())) {
					// 查询网络组件对应的网关
					GatewayEntity gateway = gatewayService.dictByNetworkId(config.getId());
					if (null == gateway || 0 == gateway.getStatus()) {
						log.info("The corresponding gateway was not found");
						return;
					}
					// 调用协议包解析请求的内容
					CompositeProtocolSupport compositeProtocolSupport = protocolAssetSupplier.getProtocolSupportById(gateway.getProtocolId());
					if (null == compositeProtocolSupport) {
						log.info("mqtt protocol package handler not found");
						return;
					}
					DeviceMessageCodec deviceMessageCodec = compositeProtocolSupport.getMessageCodecSupports().get(DefaultTransport.MQTT.getId());
					if (null == deviceMessageCodec) {
						log.info("mqtt codec is not implemented in the protocol package");
						return;
					}
					List<Message> decoder = deviceMessageCodec.decoder(payload.toString(Charset.defaultCharset()));
					// 消息存储到bus
					applicationEventPublisher.publishEvent(new MqttServerMsgEvent<>(this, decoder));
				}
			} catch (Exception e) {
				log.error(e.getMessage(), e);
			}

//			//遍历订阅关系，进行消息发布
//			for (Map.Entry<String, List<MqttEndpoint>> entry : topicSubscribers.entrySet()) {
//				String subscribedTopic = entry.getKey();
//				//被订阅的topic
//				List<MqttEndpoint> subscribers = entry.getValue();
//				//订阅上方topic的订阅者
//
//				//判断消息发布的topic是否能和被设备订阅的topic按照规则匹配
//				if (isTopicMatch(subscribedTopic, topic)) {
//					//若匹配，则遍历topic订阅者列表，并进行消息发布
//					for (MqttEndpoint subscriber : subscribers) {
//						subscriber.publish(topic, payload, publish.qosLevel(), publish.isDup(), publish.isRetain());
//					}
//				}
//			}
		});
		endpoint.publishAcknowledgeHandler(messageId -> {
				log.info("发布确认处理程序:{}", messageId);
			})
			.publishReceivedHandler(messageId -> {
				log.info("发布接收到的处理程序:{}", messageId);
				endpoint.publishRelease(messageId);
			})
			.publishCompletionHandler(messageId -> {
				log.info("发布完成处理程序:{}", messageId);
			});
		endpoint.publishReleaseHandler(endpoint::publishComplete);
	}

	/*
	 * 判断topic是否匹配
	 * */
	private boolean isTopicMatch(String subscribedTopic, String publishedTopic) {
		String[] publishTopicArray = publishedTopic.split("/");
		String[] subscribedTopicArray = subscribedTopic.split("/");
		//将两个要比较的topic分割

		//订阅的topic长度不能比发布的topic长一个以上
		if (subscribedTopicArray.length - 1 > publishTopicArray.length) {
			return false;
		}

		//如果发布的topic长度比订阅的topic长度要长
		//并且订阅的topic最后不是以#结尾都返回false，因为这不可能
		if (subscribedTopicArray.length < publishTopicArray.length) {
			if (!subscribedTopicArray[subscribedTopicArray.length - 1].equals("#")) {
				return false;
			}
		}

		//对两个topic进行比较
		for (int i = 0; i < publishTopicArray.length && i < subscribedTopicArray.length; i++) {
			//如果匹配成功或者匹配到了+，进行下一层匹配
			if (subscribedTopicArray[i].equals(publishTopicArray[i]) || subscribedTopicArray[i].equals("+")) {
				continue;
			}

			//如果匹配到了#，直接通过
			if (subscribedTopicArray[i].equals("#")) {
				break;
			}
			return false;
		}
		return true;
	}

	public boolean isValidTopic(String topic) {
		//topic 不能包含任何空格，并且要么以 /# 结尾，要么不包含 #
		return (!topic.matches(".*\\s+.*")) && (topic.matches(".*(?:\\/#)?$"));
	}

}
